Flink 您所在的位置:网站首页 flink 表 Flink

Flink

#Flink| 来源: 网络整理| 查看: 265

Flink Table API Flink SQL

Table API Flink SQL

在这里插入图片描述

Flink 对批处理和流处理,提供了统一的上层APITable API是一套内嵌在Java和Scala语言中的查询API,它允许以非常直观的方式组合来自一些关系运算符的查询Flink 的SQL 支持基于实现了SQL标准的Apache Calcite

Apache Flink 有两种关系型 API 来做流批统一处理:Table API 和 SQL。Table API 是用于 Scala 和 Java 语言的查询API,它可以用一种非常直观的方式来组合使用选取、过滤、join 等关系型算子。Flink SQL 是基于 Apache Calcite 来实现的标准 SQL。

依赖导入:

org.apache.flink flink-table-planner-blink_${scala.binary.version} ${flink.version} org.apache.flink flink-table-api-java-bridge_${scala.binary.version} ${flink.version} org.apache.flink flink-streaming-scala_${scala.binary.version} ${flink.version} org.apache.flink flink-csv ${flink.version} 简单案例 StreamExecutionEnvironment转换为StreamTableEnvironment public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStreamSource dataStream = env.readTextFile("data/temps.txt"); SingleOutputStreamOperator beanDataStream = dataStream.map(new MapFunction() { @Override public TempInfo map(String value) throws Exception { String[] split = value.split(","); return new TempInfo(split[0], new Long(split[1]), new Double(split[2])); } }); // 执行环境创建 StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); // 构建table 对象 Table table = tableEnv.fromDataStream(beanDataStream); // table.select("id,timeStamp") 过期 // flink 1.12.0 中写法 /* Table resultTable = table.select($("id"), $("timeStamp")) .where($("id").isEqual("1")); tableEnv.toAppendStream(resultTable, Row.class).print("resultTable");*/ // SQL 语句构建查询 tableEnv.createTemporaryView("sensor",table); String sql = "select * from sensor where id = '1' "; Table sqlTab = tableEnv.sqlQuery(sql); tableEnv.toAppendStream(sqlTab, Row.class).print("sqlTable"); env.execute(); } 基本程序结构

Table API和SQL的程序结构,与流处理的程序结构十分类似

StreamTableEnviroment tableEnv = ... //创建表的执行环境 //创建一张表,用于读取数据 tableEnv.connect(...).createTemporaryTable("inputTable") // 注册一张表,用于把计算结果输出 tableEnv.connect(...).createTemporaryTable("outputTable"); // 通过Table API 查询 Table result = tableEnv.from("inputTable").select(...); // 通过SQL查询语句,得到一张结果表 Table sqlResult = tableEnv.sqlQuery("select ... from inputTable ... "); // 将结果表写入输出表中 result.insertInto("outputTable"); 表 Table TableEnvironment可以注册目录Catalog,并可以基于Catalog注册表表(Table)是由一个"标识符"(identifier)来指定的,由3部分组成:Catalog名,数据库(database)名和对象名表可以是常规的,也可以是虚拟的(视图,View)常规表(Table)一般可以用来描述外部数据,比如文件,数据库表或消息队列的数据,也可以直接从DataStream转换而来视图(View)可以从现有的表中创建,通常是table API或者SQL查询的一个结果集 创建TableEnvironment

创建表的执行环境,需要将flink流处理的执行环境传入

StreamTableEnvironment tableEnv = StreamTableEnviroment.creat(env);

TableEnvironment是flink中集成Table API和SQL的核心概念,所有对表的操作都基于TableEnvironment

注册Catalog在Catalog中注册表执行SQL查询注册用户自定义函数(UDF)

构建不同运行环境代码

public static void main(String[] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); // 1.1 基于老版本 Planner 的流处理 EnvironmentSettings oldEnvSettings = EnvironmentSettings.newInstance() .useOldPlanner() .inStreamingMode() .build(); StreamTableEnvironment oldstreamTableEnv = StreamTableEnvironment.create(env, oldEnvSettings); // 1.2 基于老版本 Planner 的批处理 ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment oldbatchTableEnv = BatchTableEnvironment.create(batchEnv); // 1.3 基于 blink 的流处理 EnvironmentSettings blinkEnvSettings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build(); StreamTableEnvironment blinkstreamTableEnv = StreamTableEnvironment.create(env, blinkEnvSettings); // 1.4 基于 blink 的批处理 EnvironmentSettings batchEnvSettings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inBatchMode() .build(); TableEnvironment blinkBatchTableEnv = TableEnvironment.create(batchEnvSettings); } 创建表

TableEnvironment可以调用.connect()方法,连接外部系统,并调用.createTemporaryTable()方法,在Catalog中注册表

tableEnv .connect(...) //定义表的数据来源,和外部系统建立连接 .withFormat(...) //定义数据格式化方法 .withSchema(...) //定义表结构 .createTemporaryTable("MyTable"); // 创建临时表 表的查询 – Table API

Table API是集成在Scala和Java语言内的查询API

Table APi基于代表"表"的Table类,并提供一整套操作处理的方法API;这些方法会返回一个新的Table对象,表示对输入表应用转换操作的结果

有些关系型转换操作,可以由多个方法调用组成,构成链式调用结构

Table sensorTable = tableEnv.from("inputTable"); Table resultTable = sensorTable .select("id, temperature") .filter("id = 'sensor_1' ")

查询案例

public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); String filepath = "data/temps.txt"; // flink 1.12.0 中建议不要使用,以后可能会被重构 请使用executeSql tableEnv.connect(new FileSystem().path(filepath)) .withFormat(new Csv()) .withSchema(new Schema() .field("id", DataTypes.STRING()) .field("timestamp", DataTypes.BIGINT()) .field("temp", DataTypes.DOUBLE()) ) .createTemporaryTable("inputTable"); Table inputTable = tableEnv.from("inputTable"); // tableEnv.toAppendStream(inputTable, Row.class).print("inputTable"); Table resultTable = inputTable.select($("id"), $("timestamp")) .filter($("id").isEqual("1")); Table aggTable = inputTable.groupBy($("id")) .select($("id"), $("temp").avg()); //SQL 写法已过期 /*Table aggTable = inputTable.groupBy("id") .select("id, id.count as count, temp.avg as avgTemp");*/ tableEnv.sqlQuery("select id , temp, from inputTable where id = '1' "); Table sqlAggTable = tableEnv.sqlQuery("select id, count(id) as cnt, avg(temp) as avgtemp from inputTable group by(id)"); tableEnv.toAppendStream(resultTable,Row.class).print("result"); tableEnv.toRetractStream(aggTable,Row.class).print("aggTable"); tableEnv.toRetractStream(sqlAggTable,Row.class).print("sqlAggTable"); env.execute(); } 创建表(文件写入)

创建Table来描述文件数据,他可以从文件中读取,或者将数据写入文件

tableEnv .connect( new FileSystem().path("Path/xx.txt") ) // 定义到文件系统的连接 .withFormat(new Csv()) // 定义以csv格式进行数据格式化 .withSchema(new Schema() .field("id",DataTypes.STRING()) .field("timestamp",DataTypes.BIGINT()) .field("temperature",DataTypes.DOUBLE()) ) // 定义表结构 .createTemporaryTable("sensorTable"); // 创建临时表

案例

String outpath = "data/out.txt"; tableEnv .connect( // // flink 1.12.0 中已过期 new FileSystem().path(outpath) ) // 定义到文件系统的连接 .withFormat(new Csv()) // 定义以csv格式进行数据格式化 .withSchema(new Schema() .field("id",DataTypes.STRING()) .field("timestamp",DataTypes.BIGINT()) ) // 定义表结构 .createTemporaryTable("outputTable"); // 创建临时表 // flink 1.12.0 中已过期 resultTable.insertInto("outputTable");


【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有